package com.go.flink_code;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * ClassName: Demo2_ProcessFunc
 * Description:
 * Date: 2021/12/30
 * @author: Cason
 */
public class Demo2_ProcessFunc extends ProcessFunction<String, String> implements CheckpointedFunction {

   private transient ListState<Integer> listState;
   private Integer totalCount = 0;

   @Override
   public void initializeState(FunctionInitializationContext context) throws Exception {
      // 创建保存聚合结果的状态
      ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<>("total-count", Integer.class);
      listState = context.getOperatorStateStore().getListState(listStateDescriptor);
      listState.add(0);

      // 创建
      ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
      scheduledThreadPoolExecutor.scheduleAtFixedRate(this::query, 0, 60, TimeUnit.SECONDS);

      // 恢复状态
      if (context.isRestored()) {
         for (Integer integer : listState.get()) {

         }
      }
   }

   @Override
   public void snapshotState(FunctionSnapshotContext snapshotContext) throws Exception {

   }

   @Override
   public void processElement(String value, Context ctx, Collector<String> out) throws Exception {

   }

   private void query(){
      try {
         ArrayList<Integer> integers = (ArrayList<Integer>) listState.get();
         totalCount = integers.get(0);

         // 写入Mysql

      } catch (Exception e) {
         e.printStackTrace();
      }
   }

}
